Dify Plugin SDK 是一个完整的插件框架系统,用于开发和运行 Dify 平台的插件。支持多种插件类型(工具、模型、端点、代理、数据源)和三种部署模式(Local、Remote、Serverless)。
交互架构
核心文件位置:
- 主入口:
dify_plugin/plugin.py
- 注册管理:
dify_plugin/core/plugin_registration.py
- 执行器:
dify_plugin/core/plugin_executor.py
- IO服务器:
dify_plugin/core/server/io_server.py
- 路由器:
dify_plugin/core/server/router.py
核心类继承关系

启动流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| [开始] 用户创建 Plugin 实例 │ ├─→ [步骤1] Plugin.__init__(config: DifyPluginEnv) │ │ │ ├─→ [步骤1.1] 创建 PluginRegistration 实例 │ │ │ │ │ ├─→ [步骤1.1.1] _load_plugin_configuration() │ │ │ └─→ 读取 manifest.yaml │ │ │ └─→ 读取各组件配置文件 (*.yaml) │ │ │ │ │ ├─→ [步骤1.1.2] _resolve_plugin_cls() │ │ │ ├─→ _resolve_tool_providers() │ │ │ │ └─→ 动态导入工具类 │ │ │ ├─→ _resolve_model_providers() │ │ │ │ └─→ 动态导入模型类 │ │ │ ├─→ _resolve_endpoints() │ │ │ │ └─→ 动态导入端点类 │ │ │ ├─→ _resolve_agent_providers() │ │ │ │ └─→ 动态导入代理类 │ │ │ └─→ _resolve_datasource_providers() │ │ │ └─→ 动态导入数据源类 │ │ │ │ │ └─→ [步骤1.1.3] _load_plugin_assets() │ │ └─→ 加载 _assets 目录下的资源文件 │ │ │ ├─→ [步骤1.2] 根据安装方式创建通信流 │ │ ├─→ InstallMethod.LOCAL → _launch_local_stream() │ │ │ └─→ 创建 StdioRequestReader 和 StdioResponseWriter │ │ ├─→ InstallMethod.REMOTE → _launch_remote_stream() │ │ │ └─→ 创建 TCPReaderWriter │ │ │ └─→ 发送初始化消息(配置、工具声明、模型声明) │ │ └─→ InstallMethod.SERVERLESS → _launch_serverless_stream() │ │ └─→ 创建 ServerlessRequestReader │ │ └─→ 启动 HTTP 服务器 │ │ │ ├─→ [步骤1.3] 创建 PluginExecutor 实例 │ │ └─→ 传入 config 和 registration │ │ │ ├─→ [步骤1.4] 初始化 IOServer 和 Router │ │ ├─→ IOServer.__init__() │ │ │ └─→ 创建线程池 (ThreadPoolExecutor) │ │ └─→ Router.__init__() │ │ │ └─→ [步骤1.5] _register_request_routes() │ ├─→ 注册工具路由 (invoke_tool) │ ├─→ 注册模型路由 (invoke_llm, invoke_text_embedding, etc.) │ ├─→ 注册端点路由 (invoke_endpoint) │ ├─→ 注册代理路由 (invoke_agent_strategy) │ └─→ 注册数据源路由 (validate_datasource_credentials, etc.) │ └─→ [完成] Plugin 实例初始化完成,准备运行
|
1. Plugin.init(config)
- 文件:
dify_plugin/plugin.py
- 作用:初始化插件系统的所有组件
- 参数:
config - 包含安装方式、连接信息等配置
2. PluginRegistration.init(config)
- 文件:
dify_plugin/core/plugin_registration.py
- 作用:加载插件配置和动态加载所有插件类
- 关键方法:
_load_plugin_configuration() - 加载 YAML 配置
_resolve_plugin_cls() - 解析并导入 Python 类
_load_plugin_assets() - 加载静态资源
3. 通信流创建
_launch_local_stream() - 标准输入输出模式
_launch_remote_stream() - TCP 连接模式
_launch_serverless_stream() - HTTP 服务器模式
监听流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| [开始] Plugin.run() 被调用 │ └─→ IOServer._run() │ ├─→ [线程1] _setup_instruction_listener() │ │ │ └─→ [循环] while True: │ ├─→ request_reader.read() │ │ └─→ 阻塞等待请求到达 │ │ │ ├─→ 解析请求数据 │ │ │ └─→ executer.submit(_execute_request_in_thread, data) │ └─→ 提交到线程池异步执行 │ ├─→ [线程2] request_reader.event_loop() │ └─→ 请求读取器的事件循环(如果支持) │ ├─→ [线程3] _heartbeat() (仅 Remote 模式) │ └─→ [循环] 每隔一段时间发送心跳消息 │ └─→ default_writer.write(heartbeat_message) │ └─→ [线程4] _parent_alive_check() (仅 Local 模式) └─→ [循环] 检查父进程是否存活 ├─→ 如果父进程退出 → 终止插件进程 └─→ 否则继续等待
|
1. Plugin.run()
- 文件:
dify_plugin/plugin.py
- 作用:启动插件运行,调用 IOServer._run()
2. IOServer._run()
- 文件:
dify_plugin/core/server/io_server.py
- 作用:启动多个监听线程,处理请求和维护连接
3. _setup_instruction_listener()
- 作用:主监听循环,接收请求并提交到线程池
- 流程:
- 调用
request_reader.read() 阻塞等待
- 收到请求后解析数据
- 提交到线程池执行
_execute_request_in_thread()
4. request_reader 的实现
- StdioRequestReader - 从标准输入读取
- TCPReaderWriter - 从 TCP 连接读取
- ServerlessRequestReader - 从 HTTP 请求读取
运行流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| [请求到达] request_reader.read() 接收到请求 │ └─→ [步骤1] _execute_request_in_thread(data) │ ├─→ [步骤2] 创建 Session 对象 │ └─→ Session(request_id, response_writer) │ ├─→ [步骤3] Router.dispatch(session, data) │ │ │ ├─→ [步骤3.1] 解析请求类型 │ │ └─→ 从 data 中提取 request_type │ │ │ ├─→ [步骤3.2] 查找路由处理函数 │ │ └─→ 从路由表中匹配 request_type │ │ │ └─→ [步骤3.3] 调用对应的处理函数 │ ├─→ invoke_tool → PluginExecutor.invoke_tool() │ ├─→ invoke_llm → PluginExecutor.invoke_llm() │ ├─→ invoke_endpoint → PluginExecutor.invoke_endpoint() │ └─→ 其他请求类型... │ ├─→ [步骤4] PluginExecutor 执行具体功能 │ │ │ ├─→ [步骤4.1] 从 registration 获取对应的类 │ │ └─→ registration.get_tool_provider(provider_name) │ │ │ ├─→ [步骤4.2] 实例化类并调用方法 │ │ └─→ tool_instance._invoke(...) │ │ │ └─→ [步骤4.3] 处理返回结果 │ ├─→ 普通结果 → 直接返回 │ ├─→ 生成器 → 流式返回 │ └─→ 文件 → 分块传输 │ └─→ [步骤5] 发送响应 │ ├─→ [情况1] 流式响应 │ └─→ for chunk in result: │ └─→ session.write(chunk) │ ├─→ [情况2] 二进制文件 │ └─→ 分块读取并发送 │ └─→ session.write_file(file_data) │ └─→ [情况3] 普通响应 └─→ session.write(response) └─→ session.end()
|
1. _execute_request_in_thread(data)
- 文件:
dify_plugin/core/server/io_server.py
- 作用:在线程池中执行请求处理
- 流程:创建 Session → 路由分发 → 执行 → 响应
2. Router.dispatch(session, data)
- 文件:
dify_plugin/core/server/router.py
- 作用:根据请求类型分发到对应的处理函数
- 路由表:
self._routes 字典,key 为请求类型,value 为处理函数
3. PluginExecutor 的执行方法
invoke_tool() - 执行工具调用
invoke_llm() - 执行大语言模型调用
invoke_text_embedding() - 执行文本嵌入
invoke_rerank() - 执行重排序
invoke_tts() - 执行文本转语音
invoke_speech2text() - 执行语音转文本
invoke_moderation() - 执行内容审核
invoke_endpoint() - 执行端点调用
invoke_agent_strategy() - 执行代理策略
4. Session 对象
作用:封装单次请求的上下文
方法:
write(data) - 发送响应数据
write_file(file) - 发送文件数据
end() - 结束响应
交互流程
1. Local 模式(标准输入输出)
1 2 3
| if InstallMethod.Local == config.INSTALL_METHOD: request_reader, response_writer = self._launch_local_stream(config)
|
_launch_local_stream 中创建标准输入输出的读取器和写入器
1 2 3 4 5 6
| reader = StdioRequestReader() writer = StdioResponseWriter()
writer.write(self.registration.configuration.model_dump_json() + "\n\n")
|
之后便是从标准输入中读取到数据,然后处理请求逻辑,之后通过标准输出返回结果
数据流向:
1 2 3 4 5 6 7
| Dify plugin daemon ──[请求]──> 插件stdin ──> StdioRequestReader.read() │ ▼ 处理请求逻辑 │ ▼ Dify plugin daemon <──[响应]── 插件stdout <── StdioResponseWriter.write()
|
2. Remote 模式(TCP 连接)
1 2 3
| if InstallMethod.Remote == config.INSTALL_METHOD: request_reader, response_writer = self._launch_remote_stream(config)
|
_launch_remote_stream 逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| install_host, install_port = self._get_remote_install_host_and_port(config)
tcp_stream = TCPReaderWriter( install_host, install_port, config.REMOTE_INSTALL_KEY, on_connected=lambda: self._initialize_tcp_stream(tcp_stream), )
tcp_stream.launch()
|
tcp_stream.launch() 用于建立连接,连接成功后,会执行回调函数 _initialize_tcp_stream 向远程 Dify plugin daemon 实例发送插件的完整配置信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| tcp_stream.write( InitializeMessage( type=InitializeMessage.Type.MANIFEST_DECLARATION, data=self.registration.configuration.model_dump(), ).model_dump_json() + "\n\n" )
if self.registration.tools_configuration: tcp_stream.write( InitializeMessage( type=InitializeMessage.Type.TOOL_DECLARATION, data=List(root=self.registration.tools_configuration).model_dump(), ).model_dump_json() + "\n\n" ) ......
|
数据流向:
1 2 3 4 5 6 7
| Dify plugin daemon ──[TCP请求]──> TCPReaderWriter.read() │ ▼ 处理请求逻辑 │ ▼ Dify plugin daemon <──[TCP响应]── TCPReaderWriter.write()
|
3. Serverless 模式(HTTP 服务器)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| [Dify plugin daemon] [插件 HTTP 服务] │ │ ├─→ [步骤1] 插件启动 HTTP 服务 │ │ ├─→ 启动 Flask/HTTP 服务器 │ └─→ 监听 HTTP 端口 │ ├─→ [步骤2] 发送 HTTP 请求 │ │ └─→ POST /invoke │ │ ├─→ Headers │ │ └─→ JSON Body │ │ │ │ ├─→ [步骤3] 接收请求 │ │ └─→ ServerlessRequestReader │ │ │ ├─→ [步骤4] 处理请求 │ │ └─→ 路由分发 → 执行 │ │ └─→ [步骤5] 接收 HTTP 响应 ←─────────┤ ├─→ 状态码 200 │ └─→ JSON 响应体 │
|
数据流向:
1 2 3 4 5 6 7
| Dify plugin daemon ──[HTTP POST]──> ServerlessRequestReader.read() │ ▼ 处理请求逻辑 │ ▼ Dify plugin daemon <──[HTTP Response]── 直接返回 HTTP 响应
|
请求格式示例:
1 2 3 4 5 6 7 8 9 10 11
| { "request_id": "uuid-1234-5678", "request_type": "invoke_tool", "data": { "provider": "google", "tool_name": "search", "parameters": { "query": "Python tutorial" } } }
|
响应格式示例:
1 2 3 4 5 6 7
| { "request_id": "uuid-1234-5678", "status": "success", "data": { "result": "搜索结果..." } }
|
完整调用链
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| 用户代码 │ ├─→ Plugin(config) │ │ │ ├─→ PluginRegistration(config) │ │ ├─→ _load_plugin_configuration() │ │ │ ├─→ yaml.safe_load() │ │ │ └─→ 读取各组件配置 │ │ │ │ │ ├─→ _resolve_plugin_cls() │ │ │ ├─→ _resolve_tool_providers() │ │ │ │ └─→ importlib.import_module() │ │ │ ├─→ _resolve_model_providers() │ │ │ ├─→ _resolve_endpoints() │ │ │ ├─→ _resolve_agent_providers() │ │ │ └─→ _resolve_datasource_providers() │ │ │ │ │ └─→ _load_plugin_assets() │ │ │ ├─→ _launch_*_stream() │ │ ├─→ StdioRequestReader() / TCPReaderWriter() / ServerlessRequestReader() │ │ └─→ StdioResponseWriter() / TCPReaderWriter() │ │ │ ├─→ PluginExecutor(config, registration) │ │ │ ├─→ IOServer.__init__() │ │ └─→ ThreadPoolExecutor() │ │ │ ├─→ Router.__init__() │ │ │ └─→ _register_request_routes() │ └─→ self.register_route(type, handler) │ └─→ plugin.run() │ └─→ IOServer._run() │ ├─→ Thread(_setup_instruction_listener) │ └─→ while True: │ ├─→ request_reader.read() │ └─→ executer.submit(_execute_request_in_thread) │ │ │ └─→ _execute_request() │ │ │ ├─→ Session(request_id, writer) │ │ │ ├─→ Router.dispatch(session, data) │ │ │ │ │ ├─→ 查找路由表 │ │ │ │ │ └─→ handler(session, data) │ │ │ │ │ └─→ PluginExecutor.invoke_*() │ │ │ │ │ ├─→ registration.get_*_provider() │ │ │ │ │ ├─→ provider_instance._invoke() │ │ │ └─→ 用户实现的插件逻辑 │ │ │ │ │ └─→ 返回结果 │ │ │ └─→ 发送响应 │ ├─→ session.write(data) │ │ └─→ response_writer.write() │ │ │ └─→ session.end() │ ├─→ Thread(request_reader.event_loop) │ ├─→ Thread(_heartbeat) │ └─→ Thread(_parent_alive_check)
|
核心类方法调用关系
Plugin 类:
1 2 3 4 5 6 7 8 9
| Plugin ├─→ __init__(config) │ ├─→ _launch_local_stream() │ ├─→ _launch_remote_stream() │ ├─→ _launch_serverless_stream() │ └─→ _register_request_routes() │ └─→ run() └─→ IOServer._run()
|
PluginRegistration 类:
1 2 3 4 5 6 7 8 9 10 11
| PluginRegistration ├─→ __init__(config) │ ├─→ _load_plugin_configuration() │ ├─→ _resolve_plugin_cls() │ └─→ _load_plugin_assets() │ ├─→ get_tool_provider(name) ├─→ get_model_provider(name) ├─→ get_endpoint(name) ├─→ get_agent_provider(name) └─→ get_datasource_provider(name)
|
PluginExecutor 类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| PluginExecutor ├─→ __init__(config, registration) │ ├─→ invoke_tool(session, data) │ ├─→ registration.get_tool_provider() │ └─→ tool._invoke() │ ├─→ invoke_llm(session, data) │ ├─→ registration.get_model_provider() │ └─→ model._invoke() │ ├─→ invoke_endpoint(session, data) │ ├─→ registration.get_endpoint() │ └─→ endpoint._invoke() │ └─→ 其他 invoke_* 方法...
|
IOServer 类:
1 2 3 4 5 6 7 8 9 10 11
| IOServer ├─→ __init__() │ └─→ ThreadPoolExecutor() │ ├─→ _run() │ ├─→ _setup_instruction_listener() │ ├─→ _heartbeat() │ └─→ _parent_alive_check() │ └─→ _execute_request(session, data) └─→ Router.dispatch()
|
Router 类:
1 2 3 4 5 6 7 8 9 10 11
| Router ├─→ __init__() │ └─→ self._routes = {} │ ├─→ register_route(type, handler) │ └─→ self._routes[type] = handler │ └─→ dispatch(session, data) ├─→ 解析 request_type ├─→ 查找 self._routes[request_type] └─→ handler(session, data)
|
插件接口
继承关系图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| ToolLike[T] (泛型基类) ├── Tool (工具实现) │ └── [具体工具实现] │ └── AgentStrategy (代理策略) └── [具体代理策略实现]
ToolProvider (工具提供商) └── AgentProvider (代理提供商)
ModelProvider (模型提供商) └── [具体模型提供商实现] ↓ (通过 ModelFactory 创建) AIModel (AI 模型基类) ├── LargeLanguageModel ├── TextEmbeddingModel ├── RerankModel ├── Speech2TextModel ├── TTSModel └── ModerationModel
DatasourceProvider (数据源提供商) ├── WebsiteDatasource ├── OnlineDocumentDatasource └── OnlineDriveDatasource
Endpoint (端点基类) └── [具体端点实现]
|